Skip to content

Extract fusion into GraphPass plugin API#763

Draft
antiguru wants to merge 10 commits intoTimelyDataflow:masterfrom
antiguru:graph-pass-api
Draft

Extract fusion into GraphPass plugin API#763
antiguru wants to merge 10 commits intoTimelyDataflow:masterfrom
antiguru:graph-pass-api

Conversation

@antiguru
Copy link
Member

Refactors operator fusion out of subgraph.rs into a pluggable GraphPass trait, separating fusion concerns from progress tracking.

  • SubgraphBuilder gains add_graph_pass() and runs registered passes in build() before reachability setup
  • All fusion logic (~640 lines) moves to progress/fusion.rs implementing the GraphPass trait
  • FusionPass registered automatically when fuse_chain_length >= 2
  • Tombstone mechanism (forward_to, HashSet scheduling) remains as a general graph-pass concern

Alternative to the graph IR approach in #TODO.

🤖 Generated with Claude Code

antiguru and others added 9 commits March 13, 2026 21:01
During subgraph construction, detect maximal chains of pipeline-connected
operators (single input/output, local, notify=false, no fan-in/fan-out)
and replace each chain with a single ChainScheduler that schedules
members sequentially. Intermediate pointstamps are hidden from the
reachability tracker, reducing progress tracking overhead.

Configurable via WorkerConfig::min_chain_length (default: 2, 0 disables).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Lift the notify=false restriction from chain fusion so operators like
inspect, probe, and unary_notify can be fused into chains. Frontier
changes are propagated to notify=true members via sparse
notify_frontiers (only allocated for members that need notification),
avoiding O(N²) overhead for chains where few members observe frontiers.

Tombstoned children are now added to the reachability builder with
(0, 0) inputs/outputs to preserve index positions.

The event_driven example gains a rounds parameter for benchmarking.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Revert V2 notify=true fusion which had a correctness bug: frontier
propagation within fused chains did not correctly track capabilities
for notify=true operators, causing iterative computations (e.g.,
differential-dataflow's reduce) to fail to converge.

Add identity summary check to exclude operators with non-trivial
internal summaries (e.g., feedback operators with Product(0, 1))
from chains, even though they otherwise meet all fusion criteria.

Validated against differential-dataflow's full test suite.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Report ALL chain members' initial +peers capabilities to the chain's
SharedProgress, not just the last member's. Each member independently
drops its capability during execution, so all N members' initial caps
must be visible to the reachability tracker.

Remove dead notify=true fusion scaffolding (notify_frontiers, frontier
propagation in schedule()) since detect_chains excludes notify=true
operators. See commit message of 214e57f for rationale.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Wire fuse_chain_length into install_options/from_matches so it can be
set from the command line via --fuse-chain-length N.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The `local` field on operators indicates whether progress information
is pre-circulated, not whether data flows through thread-local channels.
All regular operators return `local = true` regardless of their pact,
so the previous check didn't actually verify pipeline connectivity.

Add `is_pipeline()` to `ParallelizationContract` (default: false,
overridden to true for `Pipeline`), track it in `OperatorShape`, and
expose it through `Operate::pipeline()`. Chain detection checks the
target operator's `pipeline` in each link; the head's input pact is
irrelevant since it receives data from outside the chain.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the linear-chain-only fusion (1-input/1-output operators) with
general DAG subgraph fusion that handles arbitrary topologies: diamonds,
fan-in (concat), fan-out (branch), and mixed patterns.

Detection uses union-find over fusible edges instead of forward/reverse
chain walking. Fusibility constraints are unchanged per-operator
(\!notify, identity summaries, has operator) but the 1-input/1-output
restriction is lifted. GroupScheduler replaces ChainScheduler, using
topological sort for member execution order and port maps for the
group's external interface.

Activation forwarding (forward_to on tombstoned members) ensures
pipeline channel activations reach the group representative even when
data arrives at a non-representative member through feedback edges.

The reachability computation uses a single reverse-topological pass
over the group's internal DAG, replacing the previous per-node BFS
that was O(n^2) in group size.

Adds event_driven_diamond benchmark and DAG fusion tests (diamond,
multi-input merge, branch, repeated diamonds, Collatz mutual recursion).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add chapter 5.4 explaining how operator fusion works: fusibility
constraints, group detection, scheduling, capability mapping,
correctness argument for progress tracking, and configuration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move ~600 lines of operator fusion code out of subgraph.rs into two new
modules: graph_pass.rs defines a GraphPass trait that receives children
and edges for in-place transformation, and fusion.rs implements it with
the existing detect_groups/fuse_group/GroupScheduler logic.

SubgraphBuilder gains a graph_passes Vec and add_graph_pass() method.
The build() method runs registered passes sequentially before building
the reachability tracker. worker.rs and child.rs register FusionPass
when fuse_chain_length >= 2.

PerOperatorState fields are now pub(crate) so the fusion module can
access them. The forward_to field and tombstone mechanism remain as a
general consequence of graph pass output, not fusion-specific.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The representative's notify vector was not resized to match the fused
group's input count, causing index-out-of-bounds in propagate_pointstamps
when the group has more inputs than the original operator.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant